package com.heima.schedule.service.impl;

import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.heima.common.constants.ScheduleConstants;
import com.heima.common.redis.CacheService;
import com.heima.model.schedule.dtos.Task;
import com.heima.model.schedule.pojos.Taskinfo;
import com.heima.model.schedule.pojos.TaskinfoLogs;
import com.heima.schedule.mapper.TaskinfoLogsMapper;
import com.heima.schedule.mapper.TaskinfoMapper;
import com.heima.schedule.service.TaskService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.PostConstruct;
import java.util.Date;
import java.util.List;
import java.util.Set;

@Service
@Transactional
@Slf4j
public class TaskServiceImpl implements TaskService {

    @Autowired
    private TaskinfoMapper taskinfoMapper;
    @Autowired
    private TaskinfoLogsMapper taskinfoLogsMapper;

    @Autowired
    private CacheService cacheService;

    //添加延迟任务
    @Override
    public long addTask(Task task) {
        //1.添加任务到数据库中
        //保存任务表
        Taskinfo taskinfo = new Taskinfo();
        BeanUtils.copyProperties(task, taskinfo);
        taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
        taskinfoMapper.insert(taskinfo);

        //设置taskID
        task.setTaskId(taskinfo.getTaskId());

        //保存任务日志数据
        TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
        BeanUtils.copyProperties(taskinfo, taskinfoLogs);
        taskinfoLogs.setVersion(1);
        taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
        taskinfoLogsMapper.insert(taskinfoLogs);

        //2.添加任务到redis
        addTaskToCache(task);

        //返回任务id
        return task.getTaskId();
    }

    private void addTaskToCache(Task task) {
        String key = task.getTaskType() + "_" + task.getPriority();
        //获取5分钟之后的时间  毫秒值
        long nextTime = DateUtils.addMinutes(new Date(), 5).getTime();

        //2.1 如果任务的执行时间小于等于当前时间，存入list
        if (task.getExecuteTime() <= System.currentTimeMillis()) {
            cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));
        } else if (task.getExecuteTime() <= nextTime) {
            //2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间（未来5分钟） 存入zset中
            cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime());
        }
    }

    //取消任务
    @Override
    public boolean cancelTask(long taskId) {
        //删除任务
        taskinfoMapper.deleteById(taskId);
        //更新日志
        TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);
        taskinfoLogs.setStatus(ScheduleConstants.CANCELLED);
        taskinfoLogsMapper.updateById(taskinfoLogs);

        Task task = new Task();
        BeanUtils.copyProperties(taskinfoLogs, task);
        task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());


        //删除redis的数据
        String key = task.getTaskType() + "_" + task.getPriority();

        if (task.getExecuteTime() <= System.currentTimeMillis()) {
            cacheService.lRemove(ScheduleConstants.TOPIC + key, 0, JSON.toJSONString(task));
        } else {
            cacheService.zRemove(ScheduleConstants.FUTURE + key, JSON.toJSONString(task));
        }

        return true;
    }

    //按照类型和优先级拉取任务
    @Override
    public Task pollTask(int type, int priority) {
        //1. 从redis获取任务数据
        String key = type + "_" + priority;
        String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);

        if (StringUtils.isBlank(task_json)) {
            return null;
        }

        //2. 处理MySQL数据
        Task task = JSON.parseObject(task_json, Task.class);
        //删除任务
        taskinfoMapper.deleteById(task.getTaskId());
        //更新日志
        TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(task.getTaskId());
        taskinfoLogs.setStatus(ScheduleConstants.EXECUTED);
        taskinfoLogsMapper.updateById(taskinfoLogs);

        BeanUtils.copyProperties(taskinfoLogs, task);
        task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());

        //返回任务数据
        return task;
    }

    //未来数据定时刷新
    @Scheduled(cron = "0 0/1 * * * ?")
    public void refresh() {
        //尝试加分布式锁
        String token = cacheService.tryLock("FUTURE_TASK", 1000 * 30);
        //token为空表示加锁失败，不执行定时任务。
        if (StringUtils.isBlank(token)) {
            return;
        }

        //加锁成功执行定时任务
        System.out.println(new Date() + "执行了定时任务");
        // 获取所有未来数据集合的key值    future_*
        Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
        for (String futureKey : futureKeys) { //futureKey: future_100_50

            String topicKey = futureKey.replace(ScheduleConstants.FUTURE, ScheduleConstants.TOPIC);
            //获取该组key下当前需要消费的任务数据
            Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
            if (!tasks.isEmpty()) {
                //将这些任务数据添加到消费者队列中
                cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
                System.out.println("成功的将" + futureKey + "中的需要执行的任务刷到" + topicKey + "中");
            }
        }
    }


    @Scheduled(cron = "0 0/1 * * * ?")
    @PostConstruct
    public void reloadData() {
        //尝试加分布式锁
        String token = cacheService.tryLock("RELOAD_TASK", 1000 * 30);
        //token为空表示加锁失败，不执行定时任务。
        if (StringUtils.isBlank(token)) {
            return;
        }

        // 删除缓存中未来数据集合和当前消费者队列的所有key
        Set<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_
        Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_
        cacheService.delete(futurekeys);
        cacheService.delete(topickeys);

        //查看小于未来5分钟的所有任务
        Date nextTime = DateUtils.addMinutes(new Date(), 5);

        List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery()
                .lt(Taskinfo::getExecuteTime, nextTime));
        if (allTasks != null && allTasks.size() > 0) {
            for (Taskinfo taskinfo : allTasks) {
                Task task = new Task();
                BeanUtils.copyProperties(taskinfo, task);
                task.setExecuteTime(taskinfo.getExecuteTime().getTime());
                addTaskToCache(task);
            }
        }
    }

}